package com.lianda.udf;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 * UDF: 自定义标量函数(User Defined Scalar Function)
 * 一行输入一行输出
 *
 * UDAF: 自定义聚合函数。
 * 多行输入一行输出
 *
 * UDTF: 自定义表函数。
 * 一行输入多行输出或一列输入多列输出
 *
 */
@Slf4j
public class UDFMain {
    public static void main(String[] args) throws Exception {

        //1、设置参数
        String kafkaBootstrapServers = "localhost:9092";
        String browseTopic = "user_action";
        String browseTopicGroupID = "udf-user-action-test";

        //2、设置运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        env.setParallelism(1);

        //3、注册Kafka数据源
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream = env
                .addSource(new FlinkKafkaConsumer011<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new BrowseBoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)));

        // 增加一个额外的字段rowtime为事件时间属性
        tableEnv.registerDataStream("source_kafka",
                browseStream,
                "userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice,rowtime.rowtime");

        //4、注册UDF
        //日期转换函数: 将Flink Window Start/End Timestamp转换为指定时区时间(默认转换为北京时间)
        tableEnv.registerFunction("UDFTimestampConverter", new UDFTimestampConverter());

        //4、注册自定义函数UDTF
        tableEnv.registerFunction("UDTFOneColumnToMultiColumn",new UDTFOneColumnToMultiColumn());

        //UDAF: 求Sum
        tableEnv.registerFunction("UDAFSum", new UDAFSum());

        //5、运行SQL
        //基于事件时间，maxOutOfOrderness为5秒，滚动窗口，计算10秒内每个商品被浏览的PV
        //TUMBLE表示根据10s的时间来划分窗口
//        String sql = "	select "
//                + "		UDFTimestampConverter(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'YYYY-MM-dd HH:mm:ss') as window_start, "
//                + "		UDFTimestampConverter(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'YYYY-MM-dd HH:mm:ss','+08:00') as window_end, "
//                + "		productID, "
//                + "		UDAFSum(productPrice) as sumPrice"
//                + "	from source_kafka "
//                + "	group by productID,TUMBLE(rowtime, INTERVAL '10' SECOND)";



        String sqlUDTF = "select "
                + "	userID, eventTime, eventTimeTimestamp, eventType, productID, productPrice, rowtime, date1, time1 "
                + "from source_kafka ,"
                + "lateral table(UDTFOneColumnToMultiColumn(eventTime)) as T(date1,time1)";

        Table table = tableEnv.sqlQuery(sqlUDTF);
        tableEnv.toAppendStream(table, Row.class).print();

        //6、开始执行
        tableEnv.execute(UDFMain.class.getSimpleName());
    }

    /**
     * 自定义UDTF
     * 将一列变成两列。
     * 如:2019-12-01 10:02:06 转换成date1(2019-12-01)和time1(10:02:06)两列。
     */
    public static class UDTFOneColumnToMultiColumn extends TableFunction<Row> {
        public void eval(String value) {
            String[] valueSplits = value.split(" ");

            //一行，两列
            Row row = new Row(2);
            row.setField(0,valueSplits[0]);
            row.setField(1,valueSplits[1]);
            collect(row);
        }
        @Override
        public TypeInformation<Row> getResultType() {
            return new RowTypeInfo(Types.STRING,Types.STRING);
        }
    }

    public static class UDAFSum extends AggregateFunction<Long, UDAFSum.SumAccumulator> {
        /**
         * 定义一个Accumulator，存放聚合的中间结果
         */
        public static class SumAccumulator{
            public long sumPrice;
        }

        //初始化Accumulator
        @Override
        public UDAFSum.SumAccumulator createAccumulator() {
            UDAFSum.SumAccumulator sumAccumulator = new UDAFSum.SumAccumulator();
            sumAccumulator.sumPrice = 0;
            return sumAccumulator;
        }

        /**
         * 定义如何根据输入更新Accumulator
         */
        public void accumulate(UDAFSum.SumAccumulator accumulator, int productPrice){
            accumulator.sumPrice += productPrice;
        }

        //返回聚合的最终结果
        @Override
        public Long getValue(UDAFSum.SumAccumulator sumAccumulator) {
            return sumAccumulator.sumPrice;
        }
    }

    public static class UDFTimestampConverter extends ScalarFunction {

        //默认转换为北京时间
        public String eval(Timestamp timestamp, String format){

            LocalDateTime noZoneDateTime = timestamp.toLocalDateTime();
            ZonedDateTime utcZoneDateTime = ZonedDateTime.of(noZoneDateTime, ZoneId.of("UTC"));

            ZonedDateTime targetZoneDateTime = utcZoneDateTime.withZoneSameInstant(ZoneId.of("+08:00"));

            return targetZoneDateTime.format(DateTimeFormatter.ofPattern(format));
        }

        //转换为指定时区时间
        public String eval(Timestamp timestamp,String format,String zoneOffset){

            LocalDateTime noZoneDateTime = timestamp.toLocalDateTime();
            ZonedDateTime utcZoneDateTime = ZonedDateTime.of(noZoneDateTime, ZoneId.of("UTC"));

            ZonedDateTime targetZoneDateTime = utcZoneDateTime.withZoneSameInstant(ZoneId.of(zoneOffset));

            return targetZoneDateTime.format(DateTimeFormatter.ofPattern(format));
        }
    }

    /**
     * 解析Kafka数据
     */
    public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {
                UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));

                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                browseLog.setEventTimeTimestamp(eventTimeTimestamp);
                out.collect(browseLog);
            } catch (Exception e) {
                log.error("解析Kafka数据异常...", e);
            }
        }
    }

    public static class BrowseBoundedOutOfOrdernessTimestampExtractor
            extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> {
        BrowseBoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(UserBrowseLog element) {
            return element.getEventTimeTimestamp();
        }
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserBrowseLog {
        private String userID;
        private String eventTime;
        private long eventTimeTimestamp;
        private String eventType;
        private String productID;
        private int productPrice;
    }
}
